-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
write multiple documents in a single operation #467
Conversation
src/write-queue.ts
Outdated
@@ -21,7 +23,7 @@ export function writeQueue<T extends DocTypes>(worker: WorkerFunction<T>, payloa | |||
isProcessing = true; | |||
|
|||
const tasksToProcess = queue.splice(0, payload); | |||
const updates = tasksToProcess.map((item) => item.task); | |||
const updates = tasksToProcess.flatMap((item) => item.task || item.tasks || []); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is ugly, why not just have task be DocUpdate<T> | DocUpdate<T>[]
?
Warning Rate limit exceeded@mabels has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 1 minutes and 23 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThe pull request introduces enhancements to the document management system, focusing on bulk document operations and write queue configuration. The changes span multiple files, including Changes
Sequence DiagramsequenceDiagram
participant Client
participant Ledger
participant WriteQueue
participant Worker
Client->>Ledger: bulk(documents)
Ledger->>WriteQueue: bulk(documents)
WriteQueue-->>WriteQueue: Process in chunks
loop Process Tasks
WriteQueue->>Worker: Process task batch
Worker-->>WriteQueue: Task results
end
WriteQueue-->>Ledger: Bulk operation response
Ledger-->>Client: Document IDs and metadata
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (8)
src/write-queue.ts (5)
14-14
: Consider accepting both single and multiple tasks in one property.
While this approach is valid, you may explore a type union for the property (e.g., task?: DocUpdate | DocUpdate[] ) to unify the handling.
26-32
: Check for valid configuration in defaultWriteQueueOpts.
Consider validating that opts.chunkSize > 0 or throwing an error for invalid input, instead of silently accepting.
34-47
: Potential constructor safeguard.
You might validate or sanitize opts before assignment, ensuring chunkSize is within a safe range. Also consider logging if an unexpected value is received.
78-82
: Bulk method design is coherent.
The logic is aligned with the new queue structure. Consider adding optional concurrency controls if large bulks can overwhelm downstream processes.
84-89
: Push method is consistent with bulk.
Both methods unify well under the same queue-based approach. Might consider factoring out shared logic to avoid duplication if it grows more complex.tests/fireproof/ledger.test.ts (3)
236-268
: Consider adding error scenario test cases.The test suite verifies the happy path for ordered writes, but it would be beneficial to add test cases for error scenarios, such as:
- Network failures during writes
- Concurrent writes with conflicts
- Invalid document formats
it("should handle write failures gracefully", async () => { // Mock a network failure const failingDoc = { _id: "fail-test", hello: "world" }; // ... test implementation });
329-330
: Remove commented console.log statement.Clean up the test code by removing the commented debug statement.
rows.sort((a, b) => a.key.localeCompare(b.key)); -// console.log(rows); for (let i = 0; i < 10; i++) {
Line range hint
236-330
: Consider implementing a shared test factory for write queue configurations.The two test suites share similar setup and verification logic but with different chunk sizes. Consider creating a shared test factory that can run the same test cases with different configurations to reduce code duplication and make it easier to test additional configurations in the future.
Example approach:
const testWriteQueue = (description: string, config: { chunkSize: number }) => { describe(`basic Ledger parallel writes / ${description}`, () => { // Shared test setup and cases }); }; testWriteQueue("public ordered", { chunkSize: 1 }); testWriteQueue("public parallel", { chunkSize: 32 });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/ledger.ts
(9 hunks)src/types.ts
(3 hunks)src/write-queue.ts
(1 hunks)tests/fireproof/fireproof.test.ts
(1 hunks)tests/fireproof/ledger.test.ts
(2 hunks)
🔇 Additional comments (20)
src/write-queue.ts (8)
1-2
: Imports look good.
No issues spotted with these import statements.
9-10
: New methods in WriteQueue interface look appropriate.
The addition of the “bulk” and “close” methods aligns with the PR objective to handle multiple documents and finalize the queue’s work.
20-24
: Parameter validation suggestion.
Ensure that bare minimum constraints (e.g., chunkSize ≥ 1) are enforced or documented, preventing unintended behavior (like slice(0, 0)) when chunkSize is zero or negative.
55-60
: Concurrency note.
When multiple async calls arrive, if isProcessing is false, they will all call process concurrently. This is effectively throttled by the isProcessing guard, so it should be fine, but keep an eye out for potential race conditions or missed edge cases.
66-67
: Cleanly handling successful resolution.
Good job returning the result from the worker and fulfilling the promise in a structured way.
69-69
: Error handling is well structured.
Rejecting tasks individually with error logs is a solid approach, ensuring that one failing task doesn’t block the entire batch.
90-94
: Graceful shutdown.
Returning a Promise that resolves when the queue is empty is a clean design. Ensure that potential unhandled rejections or errors don’t prevent resolving.
97-98
: Factory function is straightforward.
No suggestions here; returning the implementation directly is concise and clear.
src/ledger.ts (8)
3-3
: Imports are correct and consistent.
These newly included items align with the new write queue features.
18-18
: BulkResponse type usage.
Ensuring references to BulkResponse remain consistent across files is crucial. No issue spotted.
53-53
: LedgerOpts now requires writeQueue.
Be sure that external usages of LedgerOpts construct objects with this property, or fallback to a default.
79-79
: New bulk method in Ledger interface.
This seamlessly extends the Ledger capabilities for multi-document operations.
113-113
: Using defaultWriteQueueOpts is a good fallback approach.
This helps ensure chunkSize and other settings are kept stable.
169-171
: Delegating ledgerShell bulk to ledgerImpl.
Clean pass-through design. The separation of concerns is clear.
236-236
: Ensuring queue is closed upon shellClose.
This is essential to gracefully release resources. Good approach.
269-269
: Customized worker for multi-doc CRDT updates.
Tying the ledger’s CRDT “bulk” method to the queue’s worker is a nice example of composable architecture.
src/types.ts (3)
9-9
: Cross-file import references.
This import from write-queue.js ensures type consistency across modules.
122-122
: Optional Partial.
Allowing partial overrides is flexible. Just ensure defaults are safely provided.
306-310
: BulkResponse interface additions.
Straightforward structure that matches usage in ledger and queue logic. Looks good.
tests/fireproof/fireproof.test.ts (1)
142-150
: Bulk operation test coverage is good.
Including multiple documents and verifying retrieval ensures functionality across the entire workflow. Consider also testing error handling (e.g., invalid input) in future tests.
it("should resolve to one head", async () => { | ||
const crdt = db.crdt; | ||
expect(crdt.clock.head.length).toBe(9); | ||
await db.put({ _id: "id-10", hello: "world" }); | ||
expect(crdt.clock.head.length).toBe(1); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid hardcoding expected head count assumptions.
The test assumes specific head counts (9 and 1) which could make the test brittle. Consider making the assertions more flexible to accommodate implementation changes while still verifying the core behavior - that multiple heads eventually resolve to one.
-expect(crdt.clock.head.length).toBe(9);
+expect(crdt.clock.head.length).toBeGreaterThan(1);
await db.put({ _id: "id-10", hello: "world" });
expect(crdt.clock.head.length).toBe(1);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
it("should resolve to one head", async () => { | |
const crdt = db.crdt; | |
expect(crdt.clock.head.length).toBe(9); | |
await db.put({ _id: "id-10", hello: "world" }); | |
expect(crdt.clock.head.length).toBe(1); | |
}); | |
it("should resolve to one head", async () => { | |
const crdt = db.crdt; | |
expect(crdt.clock.head.length).toBeGreaterThan(1); | |
await db.put({ _id: "id-10", hello: "world" }); | |
expect(crdt.clock.head.length).toBe(1); | |
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/write-queue.ts (1)
55-76
: Consider enhancing error handling and process scheduling.Two potential improvements for the process method:
- Consider adding batch-level error handling to catch and log any unexpected errors during processing.
- Replace the recursive
void this.process()
call with a non-recursive approach to prevent potential stack overflow with large queues.- void this.process(); + setImmediate(() => void this.process());
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/ledger.ts
(9 hunks)src/types.ts
(3 hunks)src/write-queue.ts
(1 hunks)tests/fireproof/fireproof.test.ts
(1 hunks)tests/fireproof/ledger.test.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/fireproof/fireproof.test.ts
- src/types.ts
🔇 Additional comments (9)
src/write-queue.ts (5)
20-32
: LGTM! Well-documented interface and sensible defaults.
The WriteQueueParams
interface and defaultWriteQueueOpts
function are well-designed with clear documentation and reasonable defaults.
34-46
: LGTM! Well-structured class implementation.
The class is well-organized with proper initialization and logging setup.
48-53
: LGTM! Proper queue state management.
The queue state tracking is well-implemented using Future for async operations.
78-89
: LGTM! Consistent implementation of bulk and push operations.
The methods are well-implemented with consistent patterns for task queueing.
90-98
: LGTM! Clean implementation of close method and factory.
The close method properly manages queue cleanup and the factory function follows good practices.
src/ledger.ts (2)
52-53
: LGTM! Well-integrated write queue configuration.
The writeQueue configuration is properly integrated into the LedgerOpts interface.
305-320
: LGTM! Robust implementation of bulk document operations.
The bulk method properly handles document processing with appropriate ID generation and error propagation.
tests/fireproof/ledger.test.ts (2)
236-268
: LGTM! Comprehensive tests for ordered write operations.
The test suite properly verifies the behavior of ordered writes, including head count and change order.
287-292
: Avoid hardcoding expected head count assumptions.
The test assumes specific head counts (9 and 1) which could make the test brittle. Consider making the assertions more flexible to accommodate implementation changes while still verifying the core behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
tests/fireproof/crdt.test.ts (1)
466-466
: Applying default queue in Loader with many transactions.Ensures concurrency patterns remain uniform in large-scale commit tests. Keep an eye on potential throughput or memory constraints when scaling.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/index.ts
(1 hunks)tests/fireproof/crdt.test.ts
(10 hunks)tests/fireproof/indexer.test.ts
(2 hunks)
🔇 Additional comments (12)
src/index.ts (1)
8-8
: Looks good!
The export statement cleanly re-exports the default queue options for external usage. This is consistent with the other export patterns in the file. No further changes needed.
tests/fireproof/indexer.test.ts (2)
1-13
: Use of defaultWriteQueueOpts in import block.
The addition of defaultWriteQueueOpts is consistent with the updated write queue configuration and helps maintain a standardized approach to queuing. No issues found here.
293-293
: Ensuring proper usage of defaultWriteQueueOpts.
By assigning writeQueue: defaultWriteQueueOpts({}), you align this test with the new default write queue configuration. Verify that any custom write queue settings are, indeed, intentionally left at defaults.
✅ Verification successful
Let me gather information about the usage of defaultWriteQueueOpts
to verify this concern.
Based on the search results, I can now provide a final response:
Default write queue configuration is consistent with test patterns
The usage of defaultWriteQueueOpts({})
in this test file follows the established pattern across the test suite. The search results show that this is the standard configuration used in most test cases, with only specific performance-related tests (test-parallel-writes
and test-parallel-writes-ordered
) explicitly overriding the defaults. The default configuration sets a chunkSize
of 32, which is appropriate for general testing scenarios.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for defaultWriteQueueOpts definition and usage
rg "defaultWriteQueueOpts" -A 5
# Search for writeQueue configurations in test files
rg "writeQueue:" --type ts -A 3 tests/
Length of output: 10068
tests/fireproof/crdt.test.ts (9)
1-1
: New import for defaultWriteQueueOpts.
The import of defaultWriteQueueOpts introduces a unified way to configure write queues for your CRDT tests. Looks good.
61-61
: Reusing the same default write queue configuration.
This standardization simplifies test maintenance. No immediate concerns.
117-117
: Consistent defaultWriteQueueOpts usage for multi-writes.
Retaining consistency helps cover multi-write scenarios uniformly. All good.
188-188
: Checking defaultQueueOpts in two multi-writes scenario.
Ensures that multiple batch operations use the same queue settings. No issues found.
304-304
: Index-related test also applying defaultWriteQueueOpts.
Maintains a unified testing context. No further action needed.
410-410
: Second Loader test with default queue.
Great to see consistent usage. No issues surfaced.
242-242
: Extended usage of default write queue in compaction scenario.
Makes compaction tests consistent with the rest of CRDT tests. Review for performance in large data sets.
✅ Verification successful
Let me gather information about the write queue usage and compaction tests to verify this concern.
Based on the search results, I can now generate the final response:
Default write queue configuration is consistently used across CRDT tests including compaction scenarios
The codebase shows consistent usage of defaultWriteQueueOpts({})
across all CRDT test scenarios, including compaction tests. The write queue configuration is uniform and there's no evidence of performance issues in large dataset tests. The compaction tests are properly integrated with the same write queue settings as other CRDT operations.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for writeQueue usage in test files
rg "writeQueue" tests/ -B 2 -A 2
# Search for compaction related tests and their write queue configuration
rg "compaction|compact" tests/ -B 2 -A 2
# Look for defaultWriteQueueOpts definition and usage
ast-grep --pattern 'defaultWriteQueueOpts'
Length of output: 21702
18-18
: Applying default write queue options.
Using defaultWriteQueueOpts for dbOpts ensures consistent behavior across tests. Confirm that these defaults meet the test scenarios' performance expectations.
✅ Verification successful
Let me search for information about the defaultWriteQueueOpts
to understand its implementation and usage across tests.
Let me check the implementation of the write queue options to understand its parameters better.
Based on the gathered information, I can now provide a final response:
Default write queue configuration is appropriate for test scenarios.
The defaultWriteQueueOpts
sets a chunkSize
of 32, which is suitable for test scenarios as it provides a good balance between performance and memory usage. The test file correctly uses these defaults, and the test suite includes specific tests with different chunk sizes (as seen in ledger.test.ts) to verify behavior under various configurations. No changes needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for defaultWriteQueueOpts definition and usage
rg "defaultWriteQueueOpts" -A 5
# Also search for writeQueue configurations in test files
rg "writeQueue" tests/ -A 3
Length of output: 10057
Script:
#!/bin/bash
# Look for WriteQueueParams interface and related configurations
cat src/write-queue.ts
Length of output: 3270
356-356
: Loader test with default write queue.
This ensures write behavior remains aligned with the rest of the codebase. Confirm any special loader edge cases are still valid.
✅ Verification successful
Let me search for related test cases and write queue implementations to verify this concern.
Let me search for the specific loader test implementation to understand the edge cases being tested.
Based on the gathered information, I can now provide the final response:
Write queue configuration is correctly tested for loader scenarios
The test suite thoroughly verifies the write queue behavior with loader operations through multiple test cases:
- Single transaction commits are verified with default chunk size
- Multiple transaction commits are tested to ensure proper queuing
- Car file loading and metadata integrity are validated
- The default write queue options (chunk size: 32) are consistently applied across all loader tests
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for write queue related tests and implementations
rg -l "writeQueue" --type ts
# Look for loader test cases
rg "loader.*test" --type ts
# Search for defaultWriteQueueOpts usage
rg "defaultWriteQueueOpts" -A 5 --type ts
Length of output: 7022
Script:
#!/bin/bash
# Get the loader test implementation and surrounding context
rg -B 5 -A 20 "test-loader" tests/fireproof/crdt.test.ts
# Check write queue implementation to understand default behavior
cat src/write-queue.ts
Length of output: 7462
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
src/write-queue.ts (4)
20-24
: Enhance WriteQueueParams documentation.The interface documentation should clarify:
- The purpose and impact of chunkSize
- Valid range of values
- Performance implications
export interface WriteQueueParams { - // default 32 - // if chunkSize is 1 the result will be ordered in time + /** + * Maximum number of tasks to process in a single batch. + * - Default: 32 + * - Set to 1 for strict time-ordered processing + * - Higher values improve throughput but may affect ordering + * @minimum 1 + */ readonly chunkSize: number; }
14-15
: Consider using union type for tasks as suggested.The current implementation uses an optional tasks array, but a union type might be clearer as suggested in the previous review.
interface WriteQueueItem<T extends DocTypes> { - // readonly task?: DocUpdate<T>; - readonly tasks?: DocUpdate<T>[]; + readonly task: DocUpdate<T> | DocUpdate<T>[]; resolve(result: MetaType): void; reject(error: Error): void; }
55-76
: Enhance process method robustness.The current implementation has several areas for improvement:
- No timeout handling for worker operations
- Memory usage could grow unbounded without queue size limits
- Error handling could be more informative
Consider implementing:
- Worker timeout mechanism
- Queue size limits
- Detailed error reporting
Example implementation:
private readonly MAX_QUEUE_SIZE = 10000; private readonly WORKER_TIMEOUT = 30000; // 30 seconds private async process() { if (this.isProcessing || this.queue.length === 0) { this.testEmptyQueue(); return; } if (this.queue.length > this.MAX_QUEUE_SIZE) { this.logger.Warn().Int("queueSize", this.queue.length).Msg("Queue size exceeds limit"); } this.isProcessing = true; const tasksToProcess = this.queue.splice(0, this.opts.chunkSize); const processWithTimeout = async (update: DocUpdate<T>[], index: number) => { const timeoutPromise = new Promise((_, reject) => { setTimeout(() => reject(new Error("Worker timeout")), this.WORKER_TIMEOUT); }); try { const result = await Promise.race([this.worker(update), timeoutPromise]); tasksToProcess[index].resolve(result); } catch (error) { const enhancedError = new Error( `Task processing failed: ${error.message}`, { cause: error } ); tasksToProcess[index].reject(enhancedError); } }; // ... rest of the implementation }
78-94
: Consider adding queue management features.The current implementation could benefit from additional queue management capabilities:
- Queue status monitoring
- Ability to pause/resume processing
- Queue statistics
Example additions:
interface WriteQueue<T extends DocTypes> { // ... existing methods ... /** Get current queue status */ status(): { length: number; isProcessing: boolean; isPaused: boolean; }; /** Pause processing */ pause(): void; /** Resume processing */ resume(): void; }
src/write-queue.ts
Outdated
constructor(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams) { | ||
this.logger = ensureLogger(sthis, "WriteQueueImpl"); | ||
this.worker = worker; | ||
this.opts = opts; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for WriteQueueParams.
The constructor should validate the chunkSize parameter to ensure it's positive and within reasonable bounds.
constructor(sthis: SuperThis, worker: WorkerFunction<T>, opts: WriteQueueParams) {
this.logger = ensureLogger(sthis, "WriteQueueImpl");
this.worker = worker;
+ if (opts.chunkSize < 1) {
+ throw new Error("chunkSize must be at least 1");
+ }
this.opts = opts;
}
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/write-queue.ts (3)
14-14
: Remove commented code.Remove the commented line as it's no longer needed and can cause confusion.
- // readonly task?: DocUpdate<T>;
26-31
: Consider adding upper bound validation and documentation.While the implementation handles the lower bound well, consider these improvements:
- Add an upper bound for
chunkSize
to prevent potential memory issues with very large chunks- Document why 32 was chosen as the default value
export function defaultWriteQueueOpts(opts: Partial<WriteQueueParams> = {}): WriteQueueParams { + const DEFAULT_CHUNK_SIZE = 32; + const MAX_CHUNK_SIZE = 1000; // or another appropriate limit + + let chunkSize = opts.chunkSize || DEFAULT_CHUNK_SIZE; + if (chunkSize <= 0) { + chunkSize = DEFAULT_CHUNK_SIZE; + } else if (chunkSize > MAX_CHUNK_SIZE) { + chunkSize = MAX_CHUNK_SIZE; + } + return { ...opts, - chunkSize: opts.chunkSize && opts.chunkSize > 0 ? opts.chunkSize : 32, + chunkSize, }; }
86-90
: Improve cleanup in close method.The
close
method should clear any pending tasks to prevent memory leaks.close(): Promise<void> { this.waitForEmptyQueue = new Future(); this.testEmptyQueue(); + // Clear any pending tasks to prevent memory leaks + this.queue.length = 0; return this.waitForEmptyQueue.asPromise(); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/write-queue.ts
(1 hunks)
🔇 Additional comments (3)
src/write-queue.ts (3)
9-10
: LGTM! Well-structured interface changes.
The new interfaces and methods are well-designed:
bulk
method aligns with the PR objective of handling multiple documentsclose
method provides proper cleanupWriteQueueParams
interface is well-documented
Also applies to: 20-24
93-94
: LGTM! Clean factory function implementation.
The factory function is well-designed and properly typed.
1-94
: Verify bulk operation implementation across the codebase.
The implementation aligns well with the PR objective of enabling multiple document writes in a single operation. Let's verify the integration points.
✅ Verification successful
Based on the comprehensive results, I can now provide the final response:
Bulk operation implementation is correctly integrated and thread-safe
The implementation shows proper integration with:
- Single write operations are converted to bulk operations (
push
callsbulk([task])
) - Queue processing is synchronized with
isProcessing
flag - Chunked processing with configurable size
- Proper error handling and promise resolution
- Tests verify both single and bulk operations
The implementation is used correctly across the codebase:
- In
ledger.ts
for document operations - In
crdt.ts
for handling document updates - In
indexer-helpers.ts
for index updates
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration of bulk operations across the codebase
# Check for bulk operation usage patterns
echo "Checking bulk operation usage patterns..."
rg -t typescript "bulk\(" --context 3
# Check for WriteQueueParams configuration
echo "Checking WriteQueueParams configuration..."
rg -t typescript "WriteQueueParams" --context 2
# Check for potential race conditions in concurrent bulk operations
echo "Checking for potential race conditions..."
ast-grep --pattern 'bulk($tasks) {
$$$
this.queue.push($$$)
$$$
}'
Length of output: 513
Script:
#!/bin/bash
# Let's try without typescript file type specification
echo "Checking bulk operation usage patterns..."
rg "bulk\(" -A 3
echo "Checking WriteQueueParams configuration..."
rg "WriteQueueParams"
echo "Checking write queue usage..."
rg "writeQueue\(" -A 3
# Check for any potential concurrent write operations
echo "Checking concurrent write patterns..."
rg "push\(" -A 3
Length of output: 24383
src/write-queue.ts
Outdated
private async process() { | ||
if (this.isProcessing || this.queue.length === 0) { | ||
this.testEmptyQueue(); | ||
return; | ||
} | ||
this.isProcessing = true; | ||
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks"); | ||
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize); | ||
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][]; | ||
const promises = updates.map(async (update, index) => { | ||
try { | ||
const result = await worker(updates); | ||
tasksToProcess.forEach((task) => task.resolve(result)); | ||
const result = await this.worker(update); | ||
tasksToProcess[index].resolve(result); | ||
} catch (error) { | ||
tasksToProcess.forEach((task) => task.reject(error as Error)); | ||
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError()); | ||
} | ||
} | ||
}); | ||
await Promise.allSettled(promises); | ||
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks"); | ||
this.isProcessing = false; | ||
setTimeout(() => this.process(), 0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and processing robustness.
The process
method has potential issues:
- If an error occurs during processing,
isProcessing
might stay true, causing a deadlock - Using
setTimeout(fn, 0)
for recursive processing could be improved
private async process() {
if (this.isProcessing || this.queue.length === 0) {
this.testEmptyQueue();
return;
}
this.isProcessing = true;
+ try {
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks");
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize);
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][];
const promises = updates.map(async (update, index) => {
try {
const result = await this.worker(update);
tasksToProcess[index].resolve(result);
} catch (error) {
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError());
}
});
await Promise.allSettled(promises);
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks");
+ } catch (error) {
+ this.logger.Error().Err(error).Msg("Fatal error in process loop");
+ } finally {
this.isProcessing = false;
- setTimeout(() => this.process(), 0);
+ queueMicrotask(() => this.process());
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private async process() { | |
if (this.isProcessing || this.queue.length === 0) { | |
this.testEmptyQueue(); | |
return; | |
} | |
this.isProcessing = true; | |
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks"); | |
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize); | |
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][]; | |
const promises = updates.map(async (update, index) => { | |
try { | |
const result = await worker(updates); | |
tasksToProcess.forEach((task) => task.resolve(result)); | |
const result = await this.worker(update); | |
tasksToProcess[index].resolve(result); | |
} catch (error) { | |
tasksToProcess.forEach((task) => task.reject(error as Error)); | |
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError()); | |
} | |
} | |
}); | |
await Promise.allSettled(promises); | |
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks"); | |
this.isProcessing = false; | |
setTimeout(() => this.process(), 0); | |
} | |
private async process() { | |
if (this.isProcessing || this.queue.length === 0) { | |
this.testEmptyQueue(); | |
return; | |
} | |
this.isProcessing = true; | |
try { | |
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processing tasks"); | |
const tasksToProcess = this.queue.splice(0, this.opts.chunkSize); | |
const updates = tasksToProcess.map((item) => item.tasks).filter((item) => item) as DocUpdate<T>[][]; | |
const promises = updates.map(async (update, index) => { | |
try { | |
const result = await this.worker(update); | |
tasksToProcess[index].resolve(result); | |
} catch (error) { | |
tasksToProcess[index].reject(this.logger.Error().Err(error).Msg("Error processing task").AsError()); | |
} | |
}); | |
await Promise.allSettled(promises); | |
this.logger.Debug().Any("opts", this.opts).Len(this.queue).Msg("Processed tasks"); | |
} catch (error) { | |
this.logger.Error().Err(error).Msg("Fatal error in process loop"); | |
} finally { | |
this.isProcessing = false; | |
queueMicrotask(() => this.process()); | |
} | |
} |
this allows inserting multiple records into a single CRDT operation. For some workloads it may see a benefit, as larger datasets can have fewer log entries.
Summary by CodeRabbit
New Features
bulk
method for processing multiple documents in the ledger.WriteQueue
with bulk processing capabilities and improved error handling.Bug Fixes
shellClose
method.Tests
bulk
method in the ledger.Ledger
functionality tests.CRDT
test cases.